{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Notebook for Training ResNet on The TinyImageNet dataset Using Multiple GPUs in the Ring-All-Reduce Architecture. Notebook (3/4) in the End-to-End Scalable Deep Learning Pipeline on Hops.\n",
    "\n",
    "This notebook will read the TFRecords that were written by notebook number 1 ([Notebook number one](./Step1_Save_to_Featurestore.ipynb)) and feed them into ResNet for multiple-GPU training with the ring-all-reduce architecture and the horovod library. \n",
    "\n",
    "Moreover it will read the hyperparameters produced by the distributed hyperparameter search in notebook number 2 ([Notebook number two](./Step2_Model_Training_Parallel_Experiments.ipynb))\n",
    "\n",
    "The distributed/parallel training over several GPUs is performed with the ring-all-reduce architecture.\n",
    "\n",
    "![step3.png](../images/step3.png)\n",
    "\n",
    "The ring-all-reduce architecture is network-optimal, utilizing all links between GPUs, rather than overloading a single link to a parameter server which easily can become a bottleneck."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Package Imports\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "import tensorflow as tf\n",
    "from tensorflow.keras.applications import ResNet50\n",
    "from hops import experiment, tensorboard, featurestore, hdfs\n",
    "from tensorflow import keras\n",
    "from tensorflow.python.keras.callbacks import TensorBoard\n",
    "import numpy as np\n",
    "import math\n",
    "import json\n",
    "from tensorflow.keras import metrics\n",
    "from tensorflow.keras.models import load_model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Constants"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "HEIGHT = 64\n",
    "WIDTH = 64\n",
    "CHANNELS = 3\n",
    "BATCH_SIZE = 100\n",
    "SHUFFLE_BUFFER_SIZE = 1000\n",
    "INPUT_SHAPE = 12288\n",
    "NUM_CLASSES = 200\n",
    "NUM_EPOCHS = 50\n",
    "LEARNING_RATE = 0.001\n",
    "TRAIN_DATASET = \"train_dataset_tinyimagenet\"\n",
    "TEST_DATASET = \"test_dataset_tinyimagenet\"\n",
    "VAL_DATASET = \"val_dataset_tinyimagenet\"\n",
    "MODEL_NAME = \"tinyimagenet_resnet50\"\n",
    "HYPERPARAMS_FILE = \"tinyimagenet_best_hyperparams.json\"\n",
    "VALIDATION_RESULTS_FILE = \"tinyimagenet_distributed_training_val_results.json\"\n",
    "MODEL_VERSION = 1"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "TRAIN_DATASET_SIZE = featurestore.get_training_dataset(TRAIN_DATASET).count()\n",
    "STEPS_PER_EPOCH = int(math.floor(float(TRAIN_DATASET_SIZE)/float(BATCH_SIZE)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%local\n",
    "HYPERPARAMS_FILE = \"tinyimagenet_best_hyperparams.json\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define Model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "def define_model():\n",
    "    \"\"\"\n",
    "    Defines the model to use for image classification. \n",
    "    \n",
    "    Uses a pre-defined ResNet50 implementation provided by Keras, but uses randomly initialized weights,\n",
    "    i.e pre-defined but not pre-TRAINED.\n",
    "    \n",
    "    Returns:\n",
    "           ResNet50 model\n",
    "    \"\"\"\n",
    "    model = ResNet50(weights=None, input_shape=(HEIGHT, WIDTH, CHANNELS), classes=NUM_CLASSES)\n",
    "    return model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define Model Input"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_tf_dataset(dataset_name, shuffle_buffer_size, batch_size, num_epochs):\n",
    "    \"\"\"\n",
    "    Creates a Tensorflow Dataset from TFRecords on HopsFS stored in the feature store.\n",
    "    \n",
    "    Args:\n",
    "        :dataset_name: name of the training dataset in the featurestore (TFRecords format)\n",
    "        :shuffle_buffer_size: size of the shuffle buffer in memory for shuffling the dataset\n",
    "        :batch_size: the size of the batch\n",
    "        :num_epochs: number of epochs to repeat the dataset\n",
    "    \n",
    "    Returns:\n",
    "           Tensorflow dataset\n",
    "    \"\"\"\n",
    "    \n",
    "    # Get Path and Schema from feature store metadata\n",
    "    tf_record_schema = featurestore.get_training_dataset_tf_record_schema(dataset_name)\n",
    "    dataset_dir = featurestore.get_training_dataset_path(dataset_name)\n",
    "    \n",
    "    input_files = tf.gfile.Glob(dataset_dir + \"/part-r-*\")\n",
    "    dataset = tf.data.TFRecordDataset(input_files)\n",
    "\n",
    "    def decode(example_proto):\n",
    "        example = tf.parse_single_example(example_proto, tf_record_schema)\n",
    "        label_int = example[\"label\"]\n",
    "        image_flat = example[\"image\"]\n",
    "        image = tf.reshape(image_flat, (HEIGHT,WIDTH,CHANNELS))\n",
    "        label = tf.one_hot(label_int, NUM_CLASSES)\n",
    "        return image, label\n",
    "\n",
    "    dataset = dataset.map(decode).batch(batch_size).shuffle(shuffle_buffer_size)\n",
    "    dataset = dataset.repeat(num_epochs)\n",
    "    # prefetch 1 batch to make bottleneck on GPU bandwidth less likely\n",
    "    dataset = dataset.prefetch(1)\n",
    "    return dataset"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define Train Loop"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {},
   "outputs": [],
   "source": [
    "def export_model(classifier, version):\n",
    "    \"\"\"\n",
    "    Exports trained model \n",
    "    \n",
    "    Args:\n",
    "        :classifier: the model to export\n",
    "        :version: version of the model to export\n",
    "    \"\"\"\n",
    "    def _serving_input_receiver_fn():\n",
    "        # key (e.g. 'examples') should be same with the inputKey when you \n",
    "        # buid the request for prediction\n",
    "        receiver_tensors = {\"input_1\":tf.placeholder(dtype=tf.int64,shape=[1,HEIGHT,WIDTH,CHANNELS])}\n",
    "        return tf.estimator.export.ServingInputReceiver(receiver_tensors, receiver_tensors)\n",
    "    from hops import serving\n",
    "    from hops import hdfs\n",
    "    import os\n",
    "    local_export_dir = os.getcwd()\n",
    "    exported_path = classifier.export_savedmodel(local_export_dir, _serving_input_receiver_fn)\n",
    "    exported_path = exported_path.decode(\"utf-8\")\n",
    "    serving.export(exported_path, MODEL_NAME, version)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {},
   "outputs": [],
   "source": [
    "def train_fn(learning_rate):\n",
    "    \"\"\"\n",
    "    Defines the training loop:\n",
    "    \n",
    "    1. Get Model\n",
    "    2. Define custom metrics\n",
    "    3. Compile Model\n",
    "    4. Convert Keras model to TF Estimator\n",
    "    5. Fit model on train dataset\n",
    "    6. Evaluate model on validation dataset\n",
    "    7. Save validation results to HopsFS\n",
    "    8. Export trained model for serving\n",
    "    \"\"\"\n",
    "    # Tell Keras we are traning (in case it does different functionality between train/test time)\n",
    "    tf.keras.backend.set_learning_phase(True)\n",
    "\n",
    "    # 1. Get model\n",
    "    print(\"Defning the model\")\n",
    "    model = define_model()\n",
    "    print(\"Defining the model complete\")\n",
    "    \n",
    "    # 2. Define custom metrics\n",
    "    def top3_acc(x, y):\n",
    "        return metrics.top_k_categorical_accuracy(x, y, k=3)\n",
    "    \n",
    "    def top5_acc(x, y):\n",
    "        return metrics.top_k_categorical_accuracy(x, y, k=5)\n",
    "    \n",
    "    # 3. Compile the model\n",
    "    print(\"Compiling the model\")\n",
    "    model.compile(optimizer=tf.train.AdamOptimizer(learning_rate), loss='categorical_crossentropy',  \n",
    "                  metrics=['accuracy', metrics.mae,top3_acc,top5_acc])\n",
    "    print(\"Compiling the model complete\")\n",
    "    \n",
    "    # 4. Convert Keras model to TF Estimator\n",
    "    # Define DistributionStrategies and convert the Keras Model to an\n",
    "    # Estimator that utilizes these DistributionStrateges.\n",
    "    # Evaluator is a single worker, so using MirroredStrategy.\n",
    "    # Training is automatically distributed on all available GPUs when using MirroredStrategy\n",
    "    print(\"Convert keras model to a Tensorflow Estimator\")\n",
    "    run_config = tf.estimator.RunConfig(\n",
    "            train_distribute=tf.contrib.distribute.MirroredStrategy())\n",
    "    keras_estimator = tf.keras.estimator.model_to_estimator(keras_model=model, \n",
    "               config=run_config, model_dir=tensorboard.logdir())\n",
    "    print(\"Keras model to estimator conversion complete\")\n",
    "    \n",
    "    \n",
    "    # 5. Fit model on training dataset\n",
    "    print(\"Starting training...\")\n",
    "    tf.estimator.train_and_evaluate(keras_estimator, train_spec=tf.estimator.TrainSpec(\n",
    "        input_fn=lambda: create_tf_dataset(TRAIN_DATASET, SHUFFLE_BUFFER_SIZE, BATCH_SIZE, NUM_EPOCHS)),\n",
    "        eval_spec=tf.estimator.EvalSpec(\n",
    "            input_fn=lambda: create_tf_dataset(VAL_DATASET, SHUFFLE_BUFFER_SIZE, BATCH_SIZE, NUM_EPOCHS)))\n",
    "    print(\"Training complete\")\n",
    "    \n",
    "    # 6. Evalute model on validation dataset\n",
    "    print(\"Evaluating model on validation dataset\")\n",
    "    eval_results = keras_estimator.evaluate(lambda: create_tf_dataset(VAL_DATASET, SHUFFLE_BUFFER_SIZE, BATCH_SIZE, NUM_EPOCHS))    \n",
    "    val_top1acc = str(eval_results[\"accuracy\"])\n",
    "    val_top3acc = str(eval_results[\"top3_acc\"])\n",
    "    val_top5acc = str(eval_results[\"top5_acc\"])\n",
    "    validation_results = {\n",
    "        \"top1_acc\": val_top1acc,\n",
    "        \"val_top3_acc\": val_top3acc,\n",
    "        \"val_top5_acc\": val_top5acc\n",
    "    }\n",
    "    print(\"Evaluation complete\")\n",
    "    \n",
    "    # 7. Save validation results to HopsFS\n",
    "    print(\"Saving validation results to HopsFS..\")\n",
    "    val_results_path = hdfs.project_path() + \"Resources/\" + VALIDATION_RESULTS_FILE \n",
    "    hdfs.dump(json.dumps(validation_results), val_results_path)\n",
    "    print(\"Saving validation results complete\")\n",
    "    \n",
    "    \n",
    "    \n",
    "    # 8. Exporting the trained model\n",
    "    print(\"Exporting model...\")\n",
    "    export_model(keras_estimator, MODEL_VERSION)\n",
    "    print(\"Model exported\")\n",
    "    return val_top1acc"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Distributed Training Experiments\n",
    "\n",
    "To run this you should first have started your jupyter notebook server or job with the configuration \"Distributed Training\" and the MirroredStrategy and select the number of GPUs."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [],
   "source": [
    "BATCH_SIZE = 100\n",
    "NUM_EPOCHS = 1\n",
    "LEARNING_RATE = 0.01\n",
    "args_d = {}\n",
    "args_d[\"learning_rate\"] = [LEARNING_RATE]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Finished Experiment"
     ]
    }
   ],
   "source": [
    "experiment_result_path = experiment.launch(\n",
    "    train_fn, \n",
    "    args_dict = args_d,\n",
    "    name='tinyimagenet_resnet_distributed_training',\n",
    "    description=\"Training TinyImageNet Using Distributed Training\",\n",
    "    local_logdir=True\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}